package com.atguigu.app.dim;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.app.func.DimSinkFunction;
import com.atguigu.bean.TableProcess;
import com.atguigu.app.func.DimTableProcessFunction;
import com.atguigu.utils.KafkaUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

//数据流:web/app -> MySQL(binlog) -> Maxwell -> Kafka(ODS) -> FlinkApp -> Phoenix
//程  序：Mock   -> MySQL(binlog) -> Maxwell -> Kafka(ZK) -> DimApp(HDFS、ZK、HBase) -> Phoenix(DIM)
public class DimApp {

    public static void main(String[] args) throws Exception {

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 需要从Checkpoint或者Savepoint启动程序
        //2.1 开启Checkpoint,每隔5秒钟做一次CK  ,并指定CK的一致性语义
        //env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
        // 2.2 设置超时时间为 1 分钟
        //env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
        // 2.3 设置两次重启的最小时间间隔
        //env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
        // 2.5 指定从 CK 自动重启策略
        //env.setRestartStrategy(RestartStrategies.failureRateRestart(
        //        3, Time.days(1L), Time.minutes(1L)
        //));
        // 2.6 设置状态后端
        //env.setStateBackend(new HashMapStateBackend());
        //env.getCheckpointConfig().setCheckpointStorage(
        //      "hdfs://hadoop102:8020/flinkCDC"
        //);
        // 2.7 设置访问HDFS的用户名
        //System.setProperty("HADOOP_USER_NAME", "atguigu");

        //TODO 2.消费Kafka topic_db主题的数据
        DataStreamSource<String> kafkaDS = env.addSource(KafkaUtil.getFlinkKafkaConsumer("topic_db", "dim_app_220718"));

        //TODO 3.过滤并转换为JSON格式
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                if (value != null) {
                    try {
                        JSONObject jsonObject = JSON.parseObject(value);
                        out.collect(jsonObject);
                    } catch (Exception e) {
                        e.printStackTrace();
                        System.out.println("非JSON格式数据：" + value);
                    }
                }
            }
        });

        //TODO 4.使用FlinkCDC读取配置信息表
        MySqlSource<String> sqlSource = MySqlSource.<String>builder()
                .hostname("hadoop102")
                .port(3306)
                .username("root")
                .password("000000")
                .databaseList("gmall-220718-config")
                .tableList()
                .startupOptions(StartupOptions.initial())
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();
        DataStreamSource<String> mysqlDS = env.fromSource(sqlSource,
                WatermarkStrategy.noWatermarks(),
                "MysqlSource");

        //TODO 5.将配置信息流转换为广播流
        MapStateDescriptor<String, TableProcess> mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, TableProcess.class);
        BroadcastStream<String> broadcastDS = mysqlDS.broadcast(mapStateDescriptor);

        //TODO 6.连接主流和广播流
        BroadcastConnectedStream<JSONObject, String> connectedStream = jsonObjDS.connect(broadcastDS);

        //TODO 7.根据广播流数据来处理(过滤)主流数据
        SingleOutputStreamOperator<JSONObject> hbaseDS = connectedStream.process(new DimTableProcessFunction(mapStateDescriptor));

        //TODO 8.将数据写出到Phoenix
        hbaseDS.print(">>>>>>>>>>>>");
        hbaseDS.addSink(new DimSinkFunction());

        //TODO 9.启动任务
        env.execute("DimApp");


    }

}
